/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.data.service.impl;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;

import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.unidata.mdm.core.type.timeline.TimeInterval;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.data.context.DeleteRequestContext;
import org.unidata.mdm.data.context.GetMultipleRequestContext;
import org.unidata.mdm.data.context.GetRecordTimelineRequestContext;
import org.unidata.mdm.data.context.GetRecordsTimelinesRequestContext;
import org.unidata.mdm.data.context.GetRequestContext;
import org.unidata.mdm.data.context.JoinRequestContext;
import org.unidata.mdm.data.context.MergeRequestContext;
import org.unidata.mdm.data.context.PreviewRequestContext;
import org.unidata.mdm.data.context.RecordIdentityContext;
import org.unidata.mdm.data.context.RestoreRecordRequestContext;
import org.unidata.mdm.data.context.SplitRecordRequestContext;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.dto.DeleteRecordDTO;
import org.unidata.mdm.data.dto.GetRecordDTO;
import org.unidata.mdm.data.dto.GetRecordsDTO;
import org.unidata.mdm.data.dto.KeysJoinDTO;
import org.unidata.mdm.data.dto.MergeRecordsDTO;
import org.unidata.mdm.data.dto.RecordsBulkResultDTO;
import org.unidata.mdm.data.dto.RestoreRecordDTO;
import org.unidata.mdm.data.dto.SplitRecordsDTO;
import org.unidata.mdm.data.dto.UpsertRecordDTO;
import org.unidata.mdm.data.service.DataRecordsService;
import org.unidata.mdm.data.service.segments.records.batch.RecordsDeleteConnectorExecutor;
import org.unidata.mdm.data.service.segments.records.batch.RecordsMergeConnectorExecutor;
import org.unidata.mdm.data.service.segments.records.batch.RecordsRestoreConnectorExecutor;
import org.unidata.mdm.data.service.segments.records.batch.RecordsUpsertConnectorExecutor;
import org.unidata.mdm.data.type.apply.batch.impl.RecordDeleteBatchSetAccumulator;
import org.unidata.mdm.data.type.apply.batch.impl.RecordMergeBatchSetAccumulator;
import org.unidata.mdm.data.type.apply.batch.impl.RecordRestoreBatchSetAccumulator;
import org.unidata.mdm.data.type.apply.batch.impl.RecordUpsertBatchSetAccumulator;
import org.unidata.mdm.data.type.data.OriginRecord;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.meta.util.ValidityPeriodUtils;
import org.unidata.mdm.system.service.ExecutionService;
import org.unidata.mdm.system.type.batch.BatchSetSize;

/**
 * The Class DataRecordsServiceImpl.
 *
 * @author Mikhail Mikhailov Data service.
 */
@Service
public class DataRecordsServiceImpl implements DataRecordsService {
    /**
     * Batch upsert executor.
     */
    @Autowired
    @Qualifier(RecordsUpsertConnectorExecutor.SEGMENT_ID)
    private RecordsUpsertConnectorExecutor recordsUpsertConnectorExecutor;
    /**
     * Batch delete connector executor.
     */
    @Autowired
    @Qualifier(RecordsDeleteConnectorExecutor.SEGMENT_ID)
    private RecordsDeleteConnectorExecutor recordsDeleteConnectorExecutor;
    /**
     * Batch merge connector executor.
     */
    @Autowired
    @Qualifier(RecordsMergeConnectorExecutor.SEGMENT_ID)
    private RecordsMergeConnectorExecutor recordsMergeConnectorExecutor;
    /**
     * Restore connector executor.
     */
    @Autowired
    @Qualifier(RecordsRestoreConnectorExecutor.SEGMENT_ID)
    private RecordsRestoreConnectorExecutor recordsRestoreConnectorExecutor;
    /**
     * Records component.
     */
    @Autowired
    private CommonRecordsComponent commonRecordsComponent;
    /**
     * The real working horse of this service.
     */
    @Autowired
    private ExecutionService executionService;

    // FIXME Move the whole stuff to a Fallback type?
    /*
    @Value("${unidata.search.index.refresh_interval:1000}")
    private long rollBackDelay;

    @Value("${unidata.data.rollback.thread.count:1}")
    private int rollBackPoolSize;

    private final ScheduledExecutorService rollBackExecutor = Executors.newScheduledThreadPool(
            rollBackPoolSize,
            new CustomizableThreadFactory("rollbackRecord-worker-")
    );

    @PreDestroy
    public void preDestroy() {
        rollBackExecutor.shutdown();
    }
    */
    /**
     * Default Spring ctor.
     */
    public DataRecordsServiceImpl() {
        super();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public GetRecordDTO getRecord(GetRequestContext ctx) {
        return executionService.execute(ctx);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordKeys identify(RecordIdentityContext ctx) {
        return commonRecordsComponent.identify(ctx);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public KeysJoinDTO join(JoinRequestContext ctx) {
        // TODO: @Modules
//        final Optional<JoinImpl> join = Optional.ofNullable(configurationService.getJoin());
//
//        join.ifPresent(j ->
//                Optional.ofNullable(configurationService.getListeners(ctx.getEntityName(), j.getBeforeJoinInstances()))
//                        .ifPresent(beforeJoinListeners ->
//                                beforeJoinListeners.forEach(beforeJoinListener -> Optional.ofNullable(beforeJoinListener.beforeJoin(ctx))
//                                        .ifPresent(exitResult -> {
//                                            if (ExitResult.Status.ERROR == exitResult.getStatus()) {
//                                                throw new BusinessException(
//                                                        "Error occurred during run before join user exit: " + exitResult.getWarningMessage(),
//                                                        ExceptionId.EX_JOIN_USER_EXIT_BEFORE_ERROR,
//                                                        exitResult.getWarningMessage()
//                                                );
//                                            }
//                                        }))));
//
//        final KeysJoinDTO joinDTO = commonComponent.join(ctx);
//
//        join.ifPresent(j ->
//                Optional.ofNullable(j.getAfterJoinInstances().get(ctx.getEntityName()))
//                        .ifPresent(afterJoinListeners ->
//                                afterJoinListeners.forEach(afterJoinListener -> Optional.ofNullable(afterJoinListener.afterJoin(joinDTO))
//                                        .ifPresent(exitResult -> {
//                                            if (ExitResult.Status.ERROR == exitResult.getStatus()) {
//                                                throw new BusinessException(
//                                                        "Error occurred during run after join user exit: " + exitResult.getWarningMessage(),
//                                                        ExceptionId.EX_JOIN_USER_EXIT_AFTER_ERROR,
//                                                        exitResult.getWarningMessage()
//                                                );
//                                            }
//                                        }))));
//
//        return joinDTO;
        return null;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public GetRecordDTO preview(PreviewRequestContext ctx) {
        return executionService.execute(ctx);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public GetRecordsDTO getRecords(GetMultipleRequestContext ctx) {
        // TODO: @Modules
//        try {
//            GetRecordsDTO result = recordsComponent.loadRecords(ctx);
//            ctx.getInnerGetContexts().forEach(tx -> auditEventsWriter.writeSuccessEvent(AuditActions.DATA_GET, tx));
//            return result;
//        } catch (Exception e) {
//            ctx.getInnerGetContexts()
//                    .forEach(tx -> auditEventsWriter.writeUnsuccessfulEvent(AuditActions.DATA_GET, e, tx));
//            throw e;
//        }
        return null;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    @Transactional
    public DeleteRecordDTO deleteRecord(DeleteRequestContext ctx) {
        return executionService.execute(ctx);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    @Transactional
    public MergeRecordsDTO merge(MergeRequestContext ctx) {
        return executionService.execute(ctx);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public Timeline<OriginRecord> loadTimeline(GetRecordTimelineRequestContext ctx) {
        return commonRecordsComponent.loadTimeline(ctx);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    @Transactional
    public UpsertRecordDTO upsertRecord(@Nonnull UpsertRequestContext ctx) {
        return executionService.execute(ctx);
    }

    /**
     * @param ctx - record get request context for copy
     * @return {@link UpsertRecordDTO}
     */
    @Override
    public List<UpsertRecordDTO> copyRecord(GetRequestContext ctx) {
        // TODO: @Modules
//        try {
//            return recordsComponent.copyRecord(ctx);
//        } catch (DataProcessingException exc) {
//            if (exc.getId() == ExceptionId.EX_DATA_ETALON_COPY_FAILED && exc.getArgs()[0] instanceof RecordKeys) {
//                reindexEtalon((RecordKeys) exc.getArgs()[0]);
//            }
//            throw exc;
//        }
        return null;
    }
    /**
     * @param ctx - apply draft version to record
     * @return {@link UpsertRecordDTO}
     */
    @Override
    public List<UpsertRecordDTO> applyDraftRecord(GetRequestContext ctx) {
        // TODO: @Modules
//        try {
//            return recordsComponent.applyDraft(ctx);
//        } catch (Exception exc) {
//            reindexEtalon(ctx);
//            throw exc;
//        }
        return null;
    }
// TODO: @Modules
//    /**
//     * {@inheritDoc}
//     */
//    @Override
//    public List<DataQualityError> getDQErrors(String id, String entity, Date date) {
//        return recordsComponent.extractDQErrors(id, entity, date);
//    }

    /**
     * {@inheritDoc}
     */
    @Override
    @Transactional
    public RestoreRecordDTO restore(RestoreRecordRequestContext ctx) {
        return executionService.execute(ctx);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public SplitRecordsDTO detachOrigin(SplitRecordRequestContext ctx) {
       return null;//recordsComponent.splitRecord(ctx);// TODO: @Modules
    }

    @Override
    public boolean reindexEtalon(RecordIdentityContext ctx) {
        return false;//recordsComponent.reindexEtalon(ctx);// TODO: @Modules
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void reapplyEtalon(UpsertRequestContext ctx) {
        // TODO: @Modules
//        ctx.putToStorage(StorageId.DATA_UPSERT_EXACT_ACTION, UpsertAction.UPDATE);
//        recordsComponent.calculateEtalon(ctx);
    }

    @Override
    public List<String> selectCovered(List<String> etalonIds, LocalDateTime from, LocalDateTime to, boolean full) {

        if (CollectionUtils.isEmpty(etalonIds)) {
            return Collections.emptyList();
        }

        final Date fromDate = toDate(from, ValidityPeriodUtils.getGlobalValidityPeriodStart());
        final Date toDate = toDate(to, ValidityPeriodUtils.getGlobalValidityPeriodEnd());

        GetRecordsTimelinesRequestContext gtrc = GetRecordsTimelinesRequestContext.builder()
                .etalonKeys(etalonIds)
                .fetchData(false)
                .build();

        List<Timeline<OriginRecord>> timelines = commonRecordsComponent.loadTimelines(gtrc);

        return timelines.stream()
                .filter(tl -> full
                        ? tl.isFullCovered(fromDate, toDate, true)
                        : tl.selectBy(fromDate, toDate).stream().anyMatch(TimeInterval::isActive)
                )
                .filter(Timeline::isPublished)
                .map(tl -> tl.<RecordKeys>getKeys().getEtalonKey().getId())
                .collect(Collectors.toList());
    }

	@Override
	public String getRecordAsXMLString(GetRequestContext ctx) {
        // TODO: @Modules
//		GetRecordDTO getRecordDTO = getRecord(ctx);
//		EtalonRecordFull etalonRecordFull = new EtalonRecordFull();
//		EtalonRecord etalonRecord = getRecordDTO.getEtalon();
//		Map<RelationStateDTO, List<GetRelationDTO>> etalonRelations = getRecordDTO.getRelations();
//		Map<String, List<GetClassifierDTO>> etalonClassifiers = getRecordDTO.getClassifiers();
//		if (Objects.nonNull(etalonRecord)) {
//			etalonRecordFull.setEtalonRecord(JaxbDataRecordUtils.to(etalonRecord, etalonRecord.getInfoSection(),
//					com.unidata.mdm.data.EtalonRecord.class));
//		}
//		if (MapUtils.isNotEmpty(etalonRelations)) {
//			etalonRelations.values().forEach(er -> {
//				er.forEach(r -> {
//					if (r.getRelationType() == RelationType.REFERENCES
//							|| r.getRelationType() == RelationType.MANY_TO_MANY) {
//						etalonRecordFull.getRelationTo().add(
//								JaxbDataRecordUtils.to(r.getEtalon(), null, com.unidata.mdm.data.RelationTo.class));
//					} else {
//						etalonRecordFull.getIntegralRecord().add(JaxbDataRecordUtils.to(r.getEtalon(),
//								r.getEtalon().getInfoSection(), com.unidata.mdm.data.IntegralRecord.class));
//					}
//				});
//			});
//		}
//		if (MapUtils.isNotEmpty(etalonClassifiers)) {
//			etalonClassifiers.values().stream().forEach(ec -> {
//				ec.stream().forEach(c -> {
//					etalonRecordFull.getEtalonClassifierRecord().add(JaxbDataRecordUtils.to(c.getEtalon(),
//							c.getEtalon().getInfoSection(), com.unidata.mdm.data.EtalonClassifierRecord.class));
//				});
//			});
//		}
//		String result = JaxbUtils.marshalEtalonRecordFull(etalonRecordFull);
//		return result;
        return null;
	}

    // TODO: @Modules
    //    @Override
    //    public void reindexModifiedEtalons(final CommonDependentContext ctx) {
    //        if (ctx == null) {
    //            return;
    //        }
    //        flatCollectCtxs(ctx).forEach(this::reindexEtalon);
    //    }
    //
    //    private void reindexEtalon(@Nonnull RecordKeys keys) {
    //        GetRequestContext rollbackCtx = GetRequestContext.builder()
    //                    .entityName(keys.getEntityName())
    //                    .lsn(keys.getEtalonKey().getLsn())
    //                    .shard(keys.getShard())
    //                    .etalonKey(keys.getEtalonKey().getId())
    //                    .originKey(keys.getOriginKey().getId())
    //                    .sourceSystem(keys.getOriginKey().getSourceSystem())
    //                    .externalId(keys.getOriginKey().getExternalId())
    //                    .build();
    //        reindexEtalon(rollbackCtx);
    //    }
    //
    //    private List<RecordIdentityContext> flatCollectCtxs(final CommonRequestContext ctx) {
    //
    //        if (ctx == null) {
    //            return Collections.emptyList();
    //        }
    //
    //        final List<CommonRequestContext> ctxs = ctx.getDependencies();
    //        if (CollectionUtils.isEmpty(ctxs)) {
    //            return ctx instanceof RecordIdentityContext
    //                    ? Collections.singletonList((RecordIdentityContext) ctx)
    //                    : Collections.emptyList();
    //        }
    //
    //        return Stream.concat((ctx instanceof RecordIdentityContext ? Stream.of((RecordIdentityContext) ctx) : Stream.empty()),
    //                ctxs.stream().flatMap(c -> this.flatCollectCtxs(c).stream()))
    //                .collect(Collectors.toList());
    //    }


        private Date toDate(LocalDateTime from, Date defaultDate) {
        if (from == null) {
            return defaultDate;
        }
        return Date.from(from.atZone(ZoneId.systemDefault()).toInstant());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchUpsertRecords(@Nonnull List<UpsertRequestContext> ctxs) {
        return batchUpsertRecords(ctxs, true);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchUpsertRecords(@Nonnull List<UpsertRequestContext> ctxs, boolean abortOnFailure) {
        RecordUpsertBatchSetAccumulator accumulator = new RecordUpsertBatchSetAccumulator(ctxs.size(), false);
        accumulator.setAbortOnFailure(abortOnFailure);
        accumulator.setBatchSetSize(BatchSetSize.SMALL);
        accumulator.statistics().collectResults(true);
        accumulator.charge(ctxs);
        return batchUpsertRecords(accumulator);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchUpsertRecords(RecordUpsertBatchSetAccumulator accumulator) {
        return recordsUpsertConnectorExecutor.execute(accumulator, null);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchDeleteRecords(List<DeleteRequestContext> ctxs) {
        return batchDeleteRecords(ctxs, true);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchDeleteRecords(List<DeleteRequestContext> ctxs, boolean abortOnFailure) {
        RecordDeleteBatchSetAccumulator accumulator = new RecordDeleteBatchSetAccumulator(ctxs.size());
        accumulator.setAbortOnFailure(abortOnFailure);
        accumulator.setBatchSetSize(BatchSetSize.SMALL);
        accumulator.statistics().collectResults(true);
        accumulator.charge(ctxs);
        return batchDeleteRecords(accumulator);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchDeleteRecords(RecordDeleteBatchSetAccumulator accumulator) {
        return recordsDeleteConnectorExecutor.execute(accumulator, null);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchMergeRecords(List<MergeRequestContext> ctxs) {
        return batchMergeRecords(ctxs, true);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchMergeRecords(List<MergeRequestContext> ctxs, boolean abortOnFailure) {
        RecordMergeBatchSetAccumulator accumulator = new RecordMergeBatchSetAccumulator(ctxs.size());
        accumulator.setAbortOnFailure(abortOnFailure);
        accumulator.setBatchSetSize(BatchSetSize.SMALL);
        accumulator.statistics().collectResults(true);
        accumulator.charge(ctxs);
        return batchMergeRecords(accumulator);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchMergeRecords(RecordMergeBatchSetAccumulator accumulator) {
        return recordsMergeConnectorExecutor.execute(accumulator, null);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchRestoreRecords(List<RestoreRecordRequestContext> ctxs) {
        return batchRestoreRecords(ctxs, true);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchRestoreRecords(List<RestoreRecordRequestContext> ctxs, boolean abortOnFailure) {
        RecordRestoreBatchSetAccumulator accumulator = new RecordRestoreBatchSetAccumulator(ctxs.size());
        accumulator.setAbortOnFailure(abortOnFailure);
        accumulator.setBatchSetSize(BatchSetSize.SMALL);
        accumulator.statistics().collectResults(true);
        accumulator.charge(ctxs);
        return batchRestoreRecords(accumulator);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordsBulkResultDTO batchRestoreRecords(RecordRestoreBatchSetAccumulator accumulator) {
        return recordsRestoreConnectorExecutor.execute(accumulator, null);
    }
}
